package cn.lokei.gateway.entity.kafka;

import java.util.Optional;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.kafka.support.KafkaHeaders;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class KafkaConsumer {

  @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
  public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack,
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    Optional message = Optional.ofNullable(record.value());
    if (message.isPresent()) {
      Object msg = message.get();
      log.info("topic_test 消费了： Topic:" + topic + ",Message:" + msg);
      ack.acknowledge();
    }
  }

  @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
  public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack,
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

    Optional message = Optional.ofNullable(record.value());
    if (message.isPresent()) {
      Object msg = message.get();
      log.info("topic_test1 消费了： Topic:" + topic + ",Message:" + msg);
      ack.acknowledge();
    }
  }
}